15 FastAPI WebSocket 服务端搭建与 Vue3 客户端封装
FastAPI WebSocket 服务端搭建与 Vue3 客户端封装
关联:索引
要解决的问题
- 为什么“能连上”不代表“能用”:连接一多就混乱、断开不清理、消息格式不可控
- 服务端如何管理连接:单连接/多连接的差异是什么,何时需要广播
- 消息为什么要解析:如何用统一 JSON 格式让服务端更好扩展与排错
- 如何快速验证接口:不写完整前端也能测通连接与收发
- 前端如何把 WebSocket 变成“可复用能力”:组合式函数封装、响应式状态绑定、组件内使用规范
章节内容(本讲核心):
- FastAPI WebSocket 服务端进阶:在“最小可用回显服务”基础上做连接管理与广播
- 服务端消息解析与响应:复用上一课的 JSON Envelope 思路,按 type 分支处理
- 服务启动与接口测试:以“增量改造 + 快速回归验证”为主(避免重复讲启动细节)
- Vue3 客户端封装:把上一课的 WebSocketAdvanced.vue 抽成 useWebSocket 组合式函数
- 响应式状态绑定:复用上一课 readyState 映射与“仅 OPEN 才能发送”的规则
- 页面实时更新:用响应式数据驱动 UI(最新数据 + 消息流),形成可复用模板
- 组件内使用规范:连接生命周期、重复连接处理、卸载清理、错误可视化
与前置知识衔接(避免重复):
-
已学(上节课与案例):客户端生命周期事件(open/message/error/close)、readyState 状态映射、统一 JSON Envelope、最简心跳与测试点
-
本讲不重复:readyState 状态机细讲、Envelope 字段逐项解释、客户端事件完整回顾、心跳原理
-
本讲复用:
02_websocket_client_advanced中的src/utils/ws.ts(buildMessage/mapReadyState)与WebSocketAdvanced.vue的收发/日志思路 -
本讲定位:围绕“服务端连接管理与广播”和“前端封装为 useWebSocket”,把上节课的 Demo 提升为可复用工程能力
-
生成 FastAPI WebSocket 服务端模板(连接管理 + 广播 + JSON 解析)
-
生成 Vue3 useWebSocket 封装代码(响应式状态 + 发送拦截 + 清理)
-
AI:输出模板与关键函数(ConnectionManager / useWebSocket)
-
学生:把模板落地到项目、联调自测、对运行结果负责
- 没有连接列表:只能服务“当前一个”,无法广播、无法统计在线
- 不清理断开的连接:广播时会报错,导致整个循环中断
- 不解析消息:业务扩展靠字符串拼接,排错困难、协议难升级
- 连接池:用一个结构统一保存在线连接(list/set 都可),不要把 ws 藏在局部变量里
- 断开清理:客户端关闭时会触发断开异常,必须从连接池剔除,否则广播会越来越慢/越来越容易报错
- 广播容错:广播给多人时,某个连接 send 失败不应影响其他连接;失败连接要标记并清理
- 解析路由:复用上一课 Envelope 思路,只做“必要字段”判断,然后按 type 分支处理(ping/chat/broadcast)
项目目标:不从零新建项目,直接基于 02_websocket_client_advanced/server/app.py 升级出“连接管理 + 广播 + 解析路由”。
推荐文件路径:02_websocket_client_advanced/server/app.py
# 标准库:负责 JSON 序列化与时间戳
import json
import time
from typing import Any
# FastAPI:WebSocket 端点与断开异常
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
def now_ms() -> int:
# 统一毫秒时间戳:用于消息 id/ts,便于排查“消息先后顺序”
return int(time.time() * 1000)
def make_msg(msg_type: str, payload: Any, msg_id: str | None = None) -> dict[str, Any]:
# 统一 Envelope:type/id/ts/payload,前后端按同一结构解析与渲染
return {"type": msg_type, "id": msg_id or str(now_ms()), "ts": now_ms(), "payload": payload}
def try_parse_json(text: str) -> dict[str, Any] | None:
# “降级解析”:不是 JSON 就返回 None,避免服务端因为解析失败而中断连接
try:
data = json.loads(text)
except Exception:
return None
# 只接受对象结构(dict),数组/字符串等一律视为无效协议
return data if isinstance(data, dict) else None
class ConnectionManager:
def __init__(self) -> None:
# 用 set 保存连接:自动去重;连接对象本身可作为集合元素
self._connections: set[WebSocket] = set()
async def connect(self, ws: WebSocket) -> None:
# WebSocket 必须先 accept,后续才能收发
await ws.accept()
self._connections.add(ws)
def disconnect(self, ws: WebSocket) -> None:
# 断开时从连接池剔除:避免广播遍历到“死连接”
self._connections.discard(ws)
async def send_json(self, ws: WebSocket, data: dict[str, Any]) -> None:
# 统一用文本帧发送 JSON(前端 WebSocket.onmessage 收到 string)
await ws.send_text(json.dumps(data, ensure_ascii=False))
async def broadcast_json(self, data: dict[str, Any]) -> None:
# 广播容错:某个连接发送失败,不影响其他连接
dead: list[WebSocket] = []
for ws in self._connections:
try:
await self.send_json(ws, data)
except Exception:
dead.append(ws)
# 广播结束后统一清理失败连接,避免影响本次遍历
for ws in dead:
self.disconnect(ws)
manager = ConnectionManager()
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
# 连接建立:加入连接池,并给客户端一个“可见证据”(system)
await manager.connect(ws)
await manager.send_json(ws, make_msg("system", {"info": "connected"}))
try:
while True:
# 等待客户端下一条文本帧(这里以 text 为主;二进制不在本讲范围)
text = await ws.receive_text()
data = try_parse_json(text)
if data is None:
# 客户端没按协议发 JSON:服务端降级为“原样回显”,用于快速联调
await manager.send_json(ws, make_msg("chat", {"echo": text}))
continue
# 从 Envelope 中取必要字段:type/id/payload
t = data.get("type")
msg_id = data.get("id")
payload = data.get("payload")
if t == "ping":
# 心跳:用同一个 msg_id 回 pong,前端方便对上是哪次 ping
await manager.send_json(ws, make_msg("pong", {}, msg_id))
elif t == "chat":
# chat:回显 payload(这里不做业务校验,先让联调闭环)
await manager.send_json(ws, make_msg("chat", {"echo": payload}, msg_id))
elif t == "broadcast":
# broadcast:把消息广播给所有在线连接(包括触发者)
await manager.broadcast_json(make_msg("chat", {"from": "server", "payload": payload}))
else:
# 未知 type:返回 system ok,避免直接报错断开
await manager.send_json(ws, make_msg("system", {"info": "ok"}))
except WebSocketDisconnect:
# 客户端正常断开:务必清理连接池
manager.disconnect(ws)
except Exception:
# 其他异常:也要清理,保证连接池不会越来越脏
manager.disconnect(ws)
要点固化(只记“新增”):
- 新增 1:
ConnectionManager统一管理连接,断开时要剔除 - 新增 2:广播要容错,发送失败不能“带崩”循环
- 新增 3:解析只做“够用的判断”,按 type 路由即可;Envelope 细节复用上节课
启动命令(复用上一课):
uvicorn app:app --reload --port 8000
测试策略(侧重“增量改造后的回归验证”):
- 只做最小手工测试即可:能连上、能发、能收、断开能清理(避免重复讲工具细节)
- 多个页面都要连接:复制粘贴,改一处漏三处
- 状态判断散落:OPEN 才能 send 的规则容易遗漏
- 生命周期遗漏:页面切换后连接没关、定时器没停
- 日志不可追:消息收发没有统一格式,排错靠猜
本节只强调“新增的封装边界”,不重复讲 readyState 与事件机制:
- 复用:readyState 映射、OPEN 发送拦截、消息日志思路(来自上一课 utils/ws.ts 与 WebSocketAdvanced.vue)
- 新增:把连接、事件绑定、清理、发送统一放进 composable,组件只做 UI 与业务触发
- 建议返回(最小够用):
status、logs/messages、connect/disconnect、send(内部做 OPEN 判断)
项目目标:基于上一课的 02_websocket_client_advanced,把组件里的连接逻辑抽到 useWebSocket,组件只保留 UI + 业务参数。
推荐文件路径:02_websocket_client_advanced/client/02_websocket_client_advanced/src/composables/useWebSocket.ts
import { onBeforeUnmount, ref } from 'vue'
import { buildMessage, mapReadyState } from '../utils/ws'
import type { ReadyStateText } from '../utils/ws'
export function useWebSocket() {
// 连接状态(文本化):用于 UI 直接展示
const status = ref<ReadyStateText>('CLOSED')
// 收发日志:用于“证据可见”,联调时非常关键
const logs = ref<string[]>([])
// 当前 WebSocket 实例:connect/disconnect 会更新它
let ws: WebSocket | null = null
function log(s: string) {
logs.value.push(s)
}
function refreshStatus() {
// 把原生 readyState 映射成可读文本
status.value = mapReadyState(ws ?? undefined)
}
function connect(url: string) {
// 避免重复连接:先断开旧连接,再建新连接
if (ws) disconnect()
ws = new WebSocket(url)
refreshStatus()
ws.onopen = () => {
refreshStatus()
log('✅ connected')
}
ws.onmessage = (e) => {
// 这里先记录“原始字符串”,后续需要再做结构化解析
log(`📥 ${typeof e.data === 'string' ? e.data : String(e.data)}`)
}
ws.onerror = () => {
refreshStatus()
log('❌ error')
}
ws.onclose = () => {
refreshStatus()
log('❌ closed')
}
}
function disconnect() {
if (!ws) return
// CONNECTING / OPEN 才需要 close;CLOSING/CLOSED 不重复操作
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) ws.close()
ws = null
refreshStatus()
}
function sendChat(text: string) {
// 输入清理:避免空消息
const t = text.trim()
if (!t) return false
// 只允许 OPEN 才 send:避免报错与“发了但其实没发出去”
if (!ws || ws.readyState !== WebSocket.OPEN) return false
// 统一按 Envelope 发送:服务端按 type 分支处理
ws.send(JSON.stringify(buildMessage('chat', { text: t })))
log(`📤 ${t}`)
return true
}
function sendPing() {
if (!ws || ws.readyState !== WebSocket.OPEN) return false
// 心跳包:让服务端回 pong,用来验证链路是否稳定
ws.send(JSON.stringify(buildMessage('ping', {})))
return true
}
// 组件卸载时自动断开:避免页面切换后“后台还连着”
onBeforeUnmount(disconnect)
return { status, logs, connect, disconnect, sendChat, sendPing }
}
推荐组件改造示例:02_websocket_client_advanced/client/02_websocket_client_advanced/src/components/WebSocketAdvanced.vue(重构后)
<template>
<div>
<h3>WebSocket(封装后)</h3>
<!-- 连接状态:联调用的第一证据 -->
<div>状态:{{ status }}</div>
<input v-model="msg" type="text" placeholder="输入消息" />
<button @click="send">发送</button>
<button @click="sendPing">ping</button>
<button @click="disconnect">断开</button>
<!-- 日志区:联调与排错的第二证据(收发原始数据) -->
<div style="margin-top: 12px; white-space: pre-line;">{{ logs.join('\\n') }}</div>
</div>
</template>
<script setup lang="ts">
import { onMounted, ref } from 'vue'
import { useWebSocket } from '../composables/useWebSocket'
// 从 composable 获取“连接能力”,组件只做 UI 与交互
const { status, logs, connect, disconnect, sendChat, sendPing } = useWebSocket()
const msg = ref('')
// 页面加载即连接:课堂 demo 更直观;真实项目可改成按钮触发
onMounted(() => connect('ws://localhost:8000/ws'))
function send() {
// sendChat 内部已做 OPEN 判断与空字符串拦截
if (sendChat(msg.value)) msg.value = ''
}
</script>
要点固化:
- useWebSocket 是“能力封装”,组件只负责 UI 与业务触发
- 发送必须拦截:只允许 OPEN 才 send,避免未就绪发送报错
- 生命周期要清理:组件卸载时断开连接,避免资源泄漏与重复连接
| 测试点 | 操作与验证 | 预期结果 |
|---|---|---|
| 服务端启动 | 启动 uvicorn | 终端无报错,ws 接口可用 |
| 客户端连接 | 打开页面(mounted 自动连接) | status 变为 OPEN,logs 出现 ✅ connected |
| 文本消息收发 | 输入文本并发送 | logs 出现回包内容(chat echo) |
| ping/pong(选做) | 点击 ping | logs 出现 pong(或服务端回包) |
| broadcast(选做) | 另开 2 个页面并触发广播 | 其他页面也收到推送,且断开页面不会带崩广播 |
| 断开清理 | 点击断开/关闭页面 | status 变为 CLOSED,连接释放(服务端连接池清理) |
作业(布置)
- 在 Vue3 中完成 WebSocket 封装,实现连接、发送、接收(useWebSocket)
- 实现实时数据展示与连接状态显示(至少包含状态文本与消息列表/最新数据)
- 提供运行截图与封装说明(说明包含:返回的状态/方法、OPEN 发送拦截、卸载清理策略)